package cn.enjoy.rocketmq.transaction;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 事务监听
 *
 * @author K
 * @date 2021/6/8 上午9:49
 */

@Component
@RocketMQTransactionListener
public class RocketTransactionListener implements RocketMQLocalTransactionListener  {

    private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();


    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        Object id = message.getHeaders().get("id");
        String destination = o.toString();
        localTrans.put(id, destination);
        org.apache.rocketmq.common.message.Message msg = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), StandardCharsets.UTF_8.name(), destination, message);
        String tags = msg.getTags();
        if (StringUtils.contains(tags, "TagA")) {
            return RocketMQLocalTransactionState.COMMIT;
        } else if (StringUtils.contains(tags, "TagB")) {
            return RocketMQLocalTransactionState.ROLLBACK;
        } else {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        return RocketMQLocalTransactionState.COMMIT;
    }

}
